package hbasecli.service;

import hbasecli.connection.HBaseConnections;
import hbasecli.exception.ParamException;
import hbasecli.model.json.*;
import hbasecli.model.json.param.CreateRowParam;
import hbasecli.model.bytes.CellInfo;
import hbasecli.model.bytes.RowInfo;
import hbasecli.model.bytes.TableFamilyInfo;
import hbasecli.model.bytes.TableInfo;
import hbasecli.model.json.param.DeleteRowParam;
import hbasecli.model.json.param.QueryParam;
import hbasecli.type.DataType;
import hbasecli.type.DataTypes;
import hbasecli.type.UTF8DataType;
import org.apache.hadoop.hbase.client.Connection;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.*;

import static hbasecli.model.json.JsonResponse.success;
import static org.springframework.util.StringUtils.hasText;

@Service
public class HBaseServiceForJson extends HBaseService {

    @Resource
    private HBaseConnections connections;

    @Resource
    private DataTypes dataTypes;

    public List<TableObject> listTable(
            String connectionId
    ) {
        Connection connection = connections.get(connectionId);
        checkConnection(connection);
        UTF8DataType utf8 = dataTypes.getUTF8();
        return tableInfoListToTableObjectList(listTable(connection), utf8);
    }

    public boolean createTable(
            String connectionId,
            String tableName,
            Collection<String> families,
            boolean deleteIfExists
    ) {
        Connection connection = connections.get(connectionId);
        checkConnection(connection);
        UTF8DataType utf8 = dataTypes.getUTF8();
        List<byte[]> list = new ArrayList<>(families.size());
        for (String family : families) {
            list.add(utf8.stringToBytes(family));
        }
        return createTable(connection, tableName, list, deleteIfExists);
    }

    public boolean deleteTable(
            String connectionId,
            List<String> tableNameList
    ) {
        Connection connection = connections.get(connectionId);
        checkConnection(connection);
        return deleteTable(connection, tableNameList);
    }

    public JsonResponse<List<RowObject>> query(QueryParam param) {
        String connectionId = param.getConnectionId();
        if (!hasText(connectionId)) {
            throw new ParamException("未知的连接");
        }
        String tableName = param.getTableName();
        if (!hasText(tableName)) {
            throw new ParamException("表名不可以为空值");
        }
        Map<String, String> valueDataTypeMap = param.getValueDataTypeMap();
        if (valueDataTypeMap == null) {
            valueDataTypeMap = Collections.emptyMap();
        }
        Connection connection = connections.get(connectionId);
        checkConnection(connection);
        String rowKeyPattern = param.getRowKeyPattern();
        String rowKeyDataTypeName = param.getRowKeyDataType();
        DataType rowKeyDataType = dataTypes.get(rowKeyDataTypeName);
        UTF8DataType utf8 = dataTypes.getUTF8();
        if (!hasText(rowKeyPattern)) {
            List<RowInfo> rowInfoList = scan(
                    connection,
                    tableName,
                    null,
                    null
            );
            return success(rowInfoListToRowObjectList(rowInfoList, utf8, rowKeyDataType, valueDataTypeMap));
        } else {
            List<RowInfo> rowInfoList = fuzzyScan(
                    connection,
                    tableName,
                    rowKeyDataType.stringToBytes(rowKeyPattern),
                    '_',
                    null
            );
            return success(rowInfoListToRowObjectList(rowInfoList, utf8, rowKeyDataType, valueDataTypeMap));
        }
    }

    public JsonResponse<Boolean> createRow(CreateRowParam param) {
        String connectionId = param.getConnectionId();
        if (!hasText(connectionId)) {
            throw new ParamException("未知的连接");
        }
        String tableName = param.getTableName();
        if (!hasText(tableName)) {
            throw new ParamException("表名不可以为空值");
        }
        String rowKey = param.getRowKey();
        if (rowKey == null || rowKey.isEmpty()) {
            throw new ParamException("行键不可以为空值");
        }
        Collection<CreateRowParam.CellParam> cells = param.getCells();
        if (cells == null || cells.isEmpty()) {
            throw new ParamException("至少要有一条有效的数据");
        }
        Connection connection = connections.get(connectionId);
        checkConnection(connection);
        String rowKeyDataTypeName = param.getRowKeyDataType();
        DataType rowKeyDataType = dataTypes.get(rowKeyDataTypeName);
        UTF8DataType utf8 = dataTypes.getUTF8();
        List<byte[][]> familyQualifierValues = new ArrayList<>();
        for (CreateRowParam.CellParam cell : cells) {
            DataType valueDataType = dataTypes.get(cell.getValueDataType());
            String family = cell.getFamily();
            String qualifier = cell.getQualifier();
            String value = cell.getValue();
            if (family != null && qualifier != null && value != null) {
                familyQualifierValues.add(
                        new byte[][]{
                                utf8.stringToBytes(family),
                                utf8.stringToBytes(qualifier),
                                valueDataType.stringToBytes(value)
                        }
                );
            }
        }
        return success(put(connection, tableName, rowKeyDataType.stringToBytes(rowKey), familyQualifierValues));
    }

    public JsonResponse<Boolean> deleteRow(DeleteRowParam param) {
        String connectionId = param.getConnectionId();
        if (!hasText(connectionId)) {
            throw new ParamException("未知的连接");
        }
        String tableName = param.getTableName();
        if (!hasText(tableName)) {
            throw new ParamException("表名不可以为空值");
        }
        Connection connection = connections.get(connectionId);
        checkConnection(connection);
        String rowKeyDataTypeName = param.getRowKeyDataType();
        DataType rowKeyDataType = dataTypes.get(rowKeyDataTypeName);
        List<byte[]> list = new ArrayList<>();
        List<String> rowKeys = param.getRowKeys();
        if (rowKeys != null) {
            for (String rowKey : rowKeys) {
                if (rowKey == null || rowKey.isEmpty()) {
                    continue;
                }
                list.add(rowKeyDataType.stringToBytes(rowKey));
            }
        }
        return success(deleteRow(connection, tableName, list));
    }

    private List<TableObject> tableInfoListToTableObjectList(
            List<TableInfo> tableInfos,
            UTF8DataType utf8
    ) {
        List<TableObject> tableObjects = new ArrayList<>(tableInfos.size());
        for (TableInfo tableInfo : tableInfos) {
            Collection<TableFamilyInfo> families = tableInfo.getFamilies();
            List<TableFamilyObject> tableFamilyObjects = new ArrayList<>(families.size());
            for (TableFamilyInfo family : families) {
                Map<byte[], byte[]> info = family.getInfo();
                Map<String, String> map = new HashMap<>(info.size());
                for (Map.Entry<byte[], byte[]> entry : info.entrySet()) {
                    map.put(utf8.bytesToString(entry.getKey()), utf8.bytesToString(entry.getValue()));
                }
                TableFamilyObject tableFamilyObject = new TableFamilyObject();
                tableFamilyObject.setName(utf8.bytesToString(family.getName()));
                tableFamilyObject.setInfo(map);
                tableFamilyObjects.add(tableFamilyObject);
            }
            TableObject tableObject = new TableObject();
            tableObject.setName(tableInfo.getName());
            tableObject.setFamilies(tableFamilyObjects);
            tableObjects.add(tableObject);
        }
        return tableObjects;
    }

    private List<RowObject> rowInfoListToRowObjectList(
            List<RowInfo> rowInfoList,
            UTF8DataType utf8,
            DataType rowKeyDataType,
            Map<String, String> valueDataTypeMap
    ) {
        List<RowObject> rowObjectList = new ArrayList<>(rowInfoList.size());
        for (RowInfo rowInfo : rowInfoList) {
            Collection<CellInfo> cells = rowInfo.getCells();
            List<CellObject> cellObjects = new ArrayList<>();
            for (CellInfo cell : cells) {
                String family = utf8.bytesToString(cell.getFamily());
                String qualifier = utf8.bytesToString(cell.getQualifier());
                DataType valueDataType = dataTypes.get(valueDataTypeMap.get(family + ":" + qualifier));
                CellObject cellObject = new CellObject();
                cellObject.setFamily(family);
                cellObject.setQualifier(qualifier);
                cellObject.setValue(valueDataType.bytesToString(cell.getValue()));
                cellObjects.add(cellObject);
            }
            RowObject rowObject = new RowObject();
            rowObject.setRowKey(rowKeyDataType.bytesToString(rowInfo.getRowKey()));
            rowObject.setCells(cellObjects);
            rowObjectList.add(rowObject);
        }
        return rowObjectList;
    }

}